#!/usr/bin/env python3

'Run QUAC tests. Return success if everything passed, failure otherwise.'

# Copyright (c) Los Alamos National Security, LLC, and others.

help_epilogue = '''
This script is used to test QUAC. You can run all the tests, or those in a
specific module, script, or cmdtest script. Examples:

  $ ./runtests
  $ ./runtests lib/tweet.py
  $ ./runtests bin/wp-get-dumps
  $ ./runtests tests/standalone/hello.script

A few modules have interactive tests. These are good for drawing pictures and
whatnot that need human interpretation to determine correctness.

Specify --force to invoke Python module tests with the "manual" dependency.
That is, this dependency is used to mark tests that are known to fail or have
some other reason to avoid routine testing.

The script prints a summary line when complete. It returns 0 (success) if
everything testable passed, and 1 (failure) otherwise. Reasons that something
is not testable include missing dependencies or no tests. Note that import
problems do cause failure.'''


import argparse
from collections import Counter, defaultdict
import glob
import io
import os.path
import re
import shutil
import subprocess
import sys
import tempfile

ap = argparse.ArgumentParser(description=__doc__, epilog=help_epilogue,
                             formatter_class=argparse.RawTextHelpFormatter)
ap.add_argument('--force',
                action='store_true',
                help='also test modules with testing disabled')
ap.add_argument('--interactive',
                action='store_true',
                help='run interactive tests instead of non-interactive (currently disabled)')
ap.add_argument('--noclean',
                action='store_true',
                help="don't remove old test runs from test directory")
ap.add_argument('--tmp',
                help="tempdir for tests (default: inside QUAC source tree)")
ap.add_argument('target',
                help='modules, scripts, and/or cmdtests to run (default: all)',
                nargs='*',
                metavar='FILE')
args = ap.parse_args()
quacbase = os.path.dirname(os.path.realpath(__file__))
os.chdir(quacbase)

# paths need to be relative and start with ./
for (i, t) in enumerate(args.target):
   if (t[:1] == '/'):
      ap.error('targets must be relative paths')
   if (t[:2] != './'):
      args.target[i] = './' + args.target[i]

# Default temporary directory is within the QUAC checkout because typical
# $TMPDIR settings are not shared between nodes in a cluster, which we need
# for some tests. On the other hand, sometimes using the one in the QUAC
# checkout has issues (e.g., if it's a read-only mount).
if (args.tmp):
   tmpdir = os.path.join(os.path.expanduser(args.tmp), 'quactest')
else:
   tmpdir = os.path.join(quacbase, 'tests/tmp')
if (not args.noclean and os.path.exists(tmpdir)):
   print('### removing old test temporary data')
   shutil.rmtree(tmpdir)
   os.mkdir(tmpdir)
os.environ['TMPDIR'] = tmpdir

# There is a bug in certain versions of the readline library which causes
# terminal escape codes to be printed on "import readline". This messes up the
# tests. The workaround is to delete the TERM environment variable. We do this
# always because it doesn't cause problems if the bug isn't present. See:
# http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html
# http://www.incenp.org/notes/2012/python-term-smm-fix.html
del os.environ['TERM']

# Used to remember which ready.sh scripts we have already tried, because some
# of them take a few seconds to run. We are lame and use a global variable.
ready_tried = defaultdict(lambda: None)

# Python modules can declare dependencies too. We store the results here; if
# true, then the dependency is satisfied. Values are set below in
# pydeps_set_status().
pydeps = { 'geo':     None,
           'icu':     None,
           'manual':  None }


### main ###

def main():
   pydeps_set_status()
   status = Counter()
   status += testloop('Python modules in lib', './lib', test_module)
   status += testloop('Python scripts in bin', './bin', test_script)
   status += testloop('cmdtest scripts in tests', './tests', test_cmdtest)
   retval = 0
   status['fail_alert'] = ''
   status['import_fail_alert'] = ''
   if (status['fail'] > 0):
      retval = 1
      status['fail_alert'] = '<-- warning!'
   if (status['import_fail'] > 0):
      status['import_fail_alert'] = '<-- warning!'
      retval = 1
   print('''\

### summary:
###   succeeded:        %(win)3d
###   failed:           %(fail)3d %(fail_alert)s
###   import failures:  %(import_fail)3d %(import_fail_alert)s
###   no tests:         %(notest)3d
###   python skips:     %(disabled)3d
###   cmdtest skips:    %(cmdtest_skip)3d''' % (status))
   print('###\n### tmpdir: %s' % tmpdir)
   sys.exit(retval)


### classes ###

class Failure(Exception):
   def __init__(self, reason):
      self.reason = reason


### other functions ###

def cmdtest_check_ready(filename):
   suite = os.path.dirname(filename)
   if (ready_tried[suite] is None):
      try:
         subprocess.check_call([os.path.join(suite, 'ready.sh')])
         ready_tried[suite] = True
         print('...', end=' ')
      except subprocess.CalledProcessError:
         ready_tried[suite] = False
         print('... missing dependencies')
         raise Failure('cmdtest_skip')
   elif (ready_tried[suite]):
      print('...', end=' ')
   else:
      print('missing dependencies (cached)')
      raise Failure('cmdtest_skip')

def file_to_module(filename):
   'Given a filename, return the corresponding module name.'
   m = re.search(r'^(\./(lib|bin/)?)?(.+)+\.py$', filename)
   return m.group(3).replace('/', '.')

def import_test(filename, text):
   '''Test that imports in filename work. We do this by extracting all
      imports, including those in doctests, into a temporary file and trying
      to import that.'''
   imfname = os.path.join(os.path.dirname(filename), 'IMPORTTEST.py')
   imfp = open(imfname, 'wt')
   for line in text.splitlines(True):
      m = re.search(r'^([ >]*)((import|from \w+ import) .+$)', line)
      if (m):
         print(m.group(2), file=imfp)
   imfp.flush()
   imout = subprocess.check_output(('python3 -m %s || true'
                                    % (file_to_module(imfname))),
                                   shell=True,
                                   stderr=subprocess.STDOUT,
                                   universal_newlines=True)
   imfp.close()
   os.unlink(imfname)  # comment to leave file for debugging
   if (len(imout) > 0):
      print()
      print(imout, end=' ')
      raise Failure('import_fail')

def is_testable(text):
   # Is there a testable.register() call?
   if (re.search(r'testable\.register\(', text) is None):
      print('no tests')
      raise Failure('notest')
   # Are there any dependencies that are unsatisfied?
   m = re.search(r'^\s*# Test-Depends: (.+)$', text, re.MULTILINE)
   if (m is not None):
      missing = sorted(dep for dep in m.group(1).split() if not pydeps[dep])
      if (len(missing) > 0):
         print('missing dependencies:', ' '.join(missing))
         raise Failure('disabled')

def pydep_geo():
   try:
      import django.contrib.gis
   except ImportError:
      return False
   return True

def pydep_icu():
   try:
      import icu
   except ImportError:
      return False
   return True

def pydep_manual():
   return args.force

def pydeps_set_status():
   for dep in pydeps:
      pydeps[dep] = globals()['pydep_' + dep]()

def test_cmdtest(filename):
   # Silently return if not a cmdtest script
   if (not re.search(r'\.script$', filename)):
      raise Failure('ignore')
   # Start test
   print(filename[2:], end=' ')
   # Do we have the dependencies (is it "ready")?
   cmdtest_check_ready(filename)
   sys.stdout.flush()
   # Run test
   #
   # FIXME: cmdtest is wrapped by "script" in order to prevent it writing to
   # /dev/tty, which is impossible to redirect. This will not work on OS X
   # because script takes different arguments there. See issue 76.
   testout = subprocess.check_output(
      ("cd %s && script -q -c 'cmdtest -k -t %s .' /dev/null || true"
       % (os.path.dirname(filename), os.path.basename(filename)[:-7])),
      shell=True,
      stderr=subprocess.STDOUT,
      universal_newlines=True)
   if (testout.find('0 failures') < 0):
      if (len(testout.splitlines()) > 12):
         print('FAILED, large diff not printed')
      else:
         print()
         print(testout, end=' ')
      raise Failure('fail')
   # Success!
   print('ok')

def test_module(filename):
   # Silently return if it's not a Python module or not requested for testing
   if (not re.search(r'(?<!__init__)\.py$', filename)):
      raise Failure('ignore')
   # Start test
   print(filename[2:], '...', end=' ')
   sys.stdout.flush()
   module = file_to_module(filename)
   text = io.open(filename, 'rt', encoding='UTF-8').read()
   # Check prerequisites for testing
   is_testable(text)
   import_test(filename, text)
   # Run tests
   if (args.interactive):
      subprocess.call(['python3', '-c',
                       'import %s ; %s.test_interactive()' % (module, module)])
   else:
      testout = subprocess.check_output(['python3', '-m', module],
                                        universal_newlines=True)
      if (len(testout) > 0):
         print()
         print(testout, end=' ')
         raise Failure('fail')
   # Success!
   print('ok')

def test_script(filename):
   # Is it a Python script? If not, ignore it.
   bytes_ = io.open(filename, 'rb').read()
   if (not re.search(rb'^#!.+python', bytes_)):
      raise Failure('ignore')
   # Start test
   text = bytes_.decode('utf8')
   print(filename[2:], '...', end=' ', flush=True)
   # Does it import quacpath?
   if (not re.search(r'^(import|from) quacpath', text, re.MULTILINE)):
      print()
      print('Does not import quacpath')
      return Counter(fail=1)
   # Check prerequisites for testing
   is_testable(text)
   import_test(filename, text)
   # Run tests (check both old- and new-style capability)
   testout = subprocess.check_output([filename, '--unittest'],
                                     universal_newlines=True)
   if (len(testout) > 0):
      print()
      print(testout, end=' ')
      raise Failure('fail')
   # Success!
   print('ok')

def testloop(name, dir_, test_func):
   print('### testing %s' % (name))
   status = Counter()
   oldcwd = os.getcwd()
   os.chdir(dir_)
   for filename in walk('.'):
      try:
         test_func(filename)
      except Failure as x:
         status[x.reason] += 1
      else:
         status['win'] += 1
   os.chdir(oldcwd)
   return status

def walk(top):
   '''Wrap os.walk() to produce a sequence of filenames, not triples. Also, if
      args.target is non-empty, include only named files.'''
   for (dirpath, dirnames, filenames) in os.walk(top):
      dirnames.sort()
      filenames.sort()
      for filename in filenames:
         # To figure out if a given path is in the list of targets -- which
         # was created when cwd was different -- we canonicalize both. It's
         # kind of verbose and we rebuild a list on every iteration. Bleah...
         if (len(args.target) == 0
             or (os.path.abspath(os.path.join(dirpath, filename))
                 in [os.path.abspath(os.path.join(quacbase, i))
                     for i in args.target])):
            yield os.path.join(dirpath, filename)


### bootstrap ###

if (__name__ == '__main__'):
   main()

#  LocalWords:  notest IMPORTTEST tok failalert failimport failimportalert
#  LocalWords:  quacpath unittest ArgumentParser
